package fun.easycode.datastream;

import cn.hutool.core.util.ReflectUtil;
import com.alibaba.otter.canal.protocol.CanalEntry;
import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;

/**
 * RowData 解析器
 *  转成自己需要的数据类型
 * @author xuzhen97
 */
public interface CanalRowDataParser<T> {
    /**
     * 解析
     * @param rowData RowData
     * @param eventType 事件类型
     * @param targetClass 目标类型
     * @return T 自定义类型
     */
    DataEntry<T> parse(CanalEntry.RowData rowData, CanalEntry.EventType eventType, Class<T> targetClass);

    /**
     * 默认实现
     * @param <T>
     */
    @Slf4j
    class DefaultCanalRowDataParser<T> implements CanalRowDataParser<T> {
        @Override
        public DataEntry<T> parse(CanalEntry.RowData rowData, CanalEntry.EventType eventType, Class<T> targetClass) {

            DataMode mode = null;
            List<CanalEntry.Column> columns = null;

            if (eventType == CanalEntry.EventType.DELETE) {
                columns = rowData.getBeforeColumnsList();
                mode = DataMode.CLEAN_UP;
            } else {
                columns = rowData.getAfterColumnsList();
                mode = DataMode.COPE;
            }

            T target = ReflectUtil.newInstance(targetClass);

            Map<String, Field> fieldMap = ReflectUtil.getFieldMap(targetClass);

            for (CanalEntry.Column column : columns){
                Field field = fieldMap.get(column.getName());
                if(field == null){
                    log.warn("字段 {} 不存在", column.getName());
                    continue;
                }
                ReflectUtil.setFieldValue(target, field, column.getValue());
            }

            return new DataEntry<>(target, mode);
        }
    }


}
